Kafka as a Data Source

Configure Confluent Kafka as a Data Source

To set up Confluent KafkaClosed Apache Kafka is a distributed event store and stream-processing platform. Apache Kafka is a distributed publish-subscribe messaging system. A message is any kind of information that is sent from a producer (application that sends the messages) to a consumer (application that receives the messages). Producers write their messages or data to Kafka topics. These topics are divided into partitions that function like logs. Each message is written to a partition and has a unique offset, or identifier. Consumers can specify a particular offset point where they can begin to read messages. as a data source, you first need to generate Confluent Cloud API keys and retrieve the relevant bootstrap server address:

  1. Login to your account in https://confluent.cloud/.

  2. Verify that you have created a cluster. In the following example, the cluster is called dih-poc:

  3. Navigate to the API keys page.

  4. Click Add key.

  5. Enter the account, scope, and details required for the API key and click Create API Key.

    The API key download page displays the new API key and secret.

  6. Click Download API key to save the API key and secret. After you click Complete, the API secret is no longer available.

  7. Navigate to the Cluster Settings page.

  8. Copy the bootstrap server address.

The API keys and bootstrap server addresses can be used either in code, or when creating a data source in SpaceDeckClosed GigaSpaces intuitive, streamlined user interface to set up, manage and control their environment. Using SpaceDeck, users can define the tools to bring legacy System of Record (SoR) databases into the in-memory data grid that is the core of the GigaSpaces system..

For example, through the CLI:

properties.put("bootstrap.servers", "YOUR BOOTSTRAP SERVERS"); // Replace with your bootstrap servers
properties.put("sasl.mechanism", "PLAIN");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.jaas.config",
        "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
properties.put("group.id", "my-consumer-group");

// Create the Kafka consumer
Consumer<String, String> consumer = new KafkaConsumer<>(properties);

Create a Confluent Kafka Data Source

To create a Confluent Kafka data source:

  1. Create a space.

  2. Prepare venv to use https://github.com/giga-di/di-lnp/blob/main/kafka-writer/kafka_writer.py:

    python -m venv venv
    source venv/bin/activate
    venv/bin/pip install -r requirements.txt
    
  3. Write messages to kafka using a script - see https://github.com/giga-di/di-lnp/blob/main/kafka-writer/kafka_writer.py with commands like the following example (complex - for messages with complex type). This script generates messages only for demonstration. The messages can have a different structure.

    venv/bin/python kafka_writer.py http://localhost:8082 schema.complex 10 31 1000 complex
    venv/bin/python kafka_writer.py http://localhost:8082 schema.raw 0 10 1000 raw
    
  4. In SpaceDeck, under Data Source, create a new data source of type KAFKA.

  5. Enter the bootstrap server address retrieved previously.

  6. Select a table.

  7. Enter the API key details retrieved previously.

  8. Start a pipeline.

  9. Verify that data is sent to the space.

Kafka Message Schema Discovery

Kafka messages should be in JSON format. The schema of the JSON is automatically extracted.

For example:

JSON message:

{
    "COUPONID": "BR2_000000000",
    "OFFERPROMOID": "candle_000",
    "EMAIL": "mail_000000001@gmail.com",
    "STR_EMPTY_ARRAY_PROP": [],
    "STR_ARRAY_PROP": [
        "abcd",
        "1234",
        "DDD1",
        "ASD±!@"
    ],
    "INT_EMPTY_ARRAY_PROP": [],
    "INT_ARRAY_PROP": [
        1,
        2,
        3,
        4
    ],
    "INT_AS_STRING_ARRAY_PROP": [
        "1",
        "2",
        "3",
        "4"
    ],
    "SHORT_EMPTY_ARRAY_PROP": [],
    "SHORT_ARRAY_PROP": [
        11,
        22,
        33,
        44
    ],
    "SHORT_AS_STRING_ARRAY_PROP": [
        "11",
        "22",
        "33",
        "44"
    ],
    "DOUBLE_EMPTY_ARRAY_PROP": [],
    "DOUBLE_ARRAY_PROP": [
        11.292,
        22.32323,
        33.4234234,
        44.543543
    ],
    "DOUBLE_AS_STRING_ARRAY_PROP": [
        "11.102",
        "22.3939",
        "33.3738",
        "44.00972"
    ],
    "LONG_EMPTY_ARRAY_PROP": [],
    "LONG_ARRAY_PROP": [
        11111,
        222222,
        33333,
        444444
    ],
    "LONG_AS_STRING_ARRAY_PROP": [
        "11",
        "22",
        "33",
        "44"
    ],
    "BIG_INTEGER_EMPTY_ARRAY_PROP": [],
    "BIG_INTEGER_ARRAY_PROP": [
        1111111111,
        222222222,
        33333333,
        444444444
    ],
    "BIG_INTEGER_AS_STRING_ARRAY_PROP": [
        "1111111",
        "222222",
        "33333333",
        "4444444444"
    ],
    "BIG_DECIMAL_EMPTY_ARRAY_PROP": [],
    "BIG_DECIMAL_ARRAY_PROP": [
        1111111111,
        222222222.43424323,
        33333333.123333,
        444444444.4343242
    ],
    "BIG_DECIMAL_AS_STRING_ARRAY_PROP": [
        "1111111.4234234",
        "222222.4324234",
        "33333333.434343",
        "4444444444.423432"
    ],
    "B_BZ00_TOKEN": 1000,
    "BZ00_TOKEN": 1000,
    "BZ00_TAR_RISHUM": "2021-08-17",
    "BZ00_ZMAN_RISHUM": "16:22:47",
    "BZ00_PEULA": 36,
    "BZ00_SNIF": 508,
    "BZ00_LAKOACH": 4741,
    "BZ00_ARUTZ": 44,
    "BZ00_BANK_MESHADER": 318.123456789,
    "BZ00_SNIF_MESHADER": 28,
    "BZ00_MAHADURA": "KKKK",
    "BZ00_SUG_CHESHBON": 14,
    "BZ00_CHESHBON": 28733,
    "BZ00_PEULA_ONLINE": 78,
    "BZ00_EZH_OFI_CHN": "BB",
    "BZ00_EZH_OFI_SHIUH": 52,
    "BZ00_EZH_SUG_LAK": "K",
    "BZ00_TAT_ARUTZ": "H",
    "BZ00_SHIUCH_CHATIV": 23,
    "BZ00_YAMNAL": "290440.62",
    "BZ00_ISKI_PRATI": 222,
    "BZ00_MIKUM_PEILUT": "QQQQQQQQQQQQQQQ",
    "BZ00_SW_AMLMAOF": 0,
    "BZ00_LAK_YACHID": "W",
    "BZ00_SW_NEKUDOT": "F",
    "BZ00_HIZD_PEULA": "DD",
    "BZ00_TAALICH_PEULA": "true",
    "MAPTEST": {
        "name": "Moshe",
        "contact": "+972-9876543210",
        "city": "Jerusalem"
    },
    "EMPTY_ARRAY_OF_MAPS_PROP": [],
    "ARRAY_OF_MAPS_PROP": [
        {
            "name": "Moshe",
            "contact": "+972-9876543210",
            "city": "Jerusalem"
        },
        {
            "name": "Yosi",
            "contact": "+972-9876540000",
            "city": "Herzlia"
        },
        {
            "name": "Jonatan",
            "contact": "+972-310101010",
            "city": "Haifa"
        }
    ],
    "LOCAL_DATE_EMPTY_ARRAY_PROP": [],
    "LOCAL_DATE_ARRAY_PROP": [
        "2021-08-18",
        "2021-09-19",
        "2021-10-11",
        "2021-12-01"
    ],
    "LOCAL_DATE_TIME_EMPTY_ARRAY_PROP": [],
    "LOCAL_DATE_TIME_ARRAY_PROP": [
        "2021-08-18T17:39:34.556153000",
        "2021-09-19T17:39:34.556153000",
        "2021-10-11T17:39:34.556153000",
        "2021-12-01T17:39:34.556153000"
    ],
    "LOCAL_TIME_EMPTY_ARRAY_PROP": [],
    "LOCAL_TIME_ARRAY_PROP": [
        "08:00:34",
        "17:09:00",
        "17:39:01",
        "17:39:35"
    ]
}

Type created in the space: